package com.young.rocketmq.simpleexample.producer;

import com.young.rocketmq.config.Const;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * 可靠异步
 * Send Messages Asynchronously
 * Asynchronous transmission is generally used
 * in response time sensitive business scenarios
 * 异步传输通常用于响应时间敏感的业务场景
 *
 * @author ：<a href="mailto:youngkun2016@163.com">young</a>
 * @date ：Created in 2020/3/29
 */
public class AsyncProducer {
    private static Logger logger = LogManager.getLogger(AsyncProducer.class);

    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer(Const.GROUP);
        // Specify name server addresses.
        producer.setNamesrvAddr(Const.NAME_SRV);
        //Launch the instance.
        producer.start();
        //发送失败的情况下 最大的尝试次数（默认值2）
        producer.setRetryTimesWhenSendAsyncFailed(0);
        //防止消息没有完成消费，主程序已经exit
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        for (int i = 0; i < 2; i++) {
            final int index = i;
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message(Const.TOPIC, "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            //回调函数处理
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    logger.info(String.format("%-10d OK %s %n", index, sendResult.getMsgId()));
                    countDownLatch.countDown();
                }

                @Override
                public void onException(Throwable e) {
                    logger.error(String.format("%-10d Exception %s %n", index, e));
                    countDownLatch.countDown();
                }
            });
        }
        // countDownLatch计数器减为0后，再等待5s
        countDownLatch.await(5, TimeUnit.SECONDS);
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}
